SFTPコネクタの転送完了時に自動的にAthenaによるクエリを実行し転送結果の集計をメールで受信してみる
初めに
先日Amazon Transfer Familyの各種イベントがEventBridgeに発行できるようになり、こちらでSFTPコネクタの転送完了を検知し後続の処理を自動的に解することができるようになりました。
SFTPコネクタはコマンドを実行することでAWS側のマネージドなSFTPクライアントがSFTPサーバとS3間のファイル転送を行う機能となりますが、
転送処理は非同期的に実行されるためコマンドの完了=ファイルの転送完了とならないため別の手段で完了を把握する必要があります。
例えば以下では1GBほどのtext1.txt
というファイルを作成し実行しそのままs3 ls
を実行していますがこのタイミングではまだ完了していないことが確認できます。
$ aws transfer start-file-transfer --connector-id c-xxxxxx --retrieve-file-paths /home/sftpuser/test1.txt /home/sftpuser/text1.txt --local-directory-path /bucketname/receive && aws s3 ls s3://bucketname/receive/ { "TransferId": "ff8b19a3-5d42-4f85-bbc8-fadba2355e3b" } 2024-02-26 13:09:23 16 test1.txt 2024-02-26 12:50:35 0 test2.txt
この関係で以前は定期的に状態を確認する必要があるため実行時間や基盤によっては確認回数が増え余計なコストがかかってしまったり、ポーリング間隔を長くする場合は余分な待ち時間が生まれてしまいましたが、今回のアップデートによりイベント駆動に持ち込み実行を最適化できる余地が生まれました。
一応正常系であればSFTPサーバからAmazon S3方面の転送であればAmazon S3に対してのPutObjectイベント駆動でも正常系は処理できますが、転送に失敗した場合は失敗しているのかまだ最中であるのかは別の手段が必要となります。
今回はコチラのイベントを利用しデイリーバッチ等でCSVがSFTPサーバからS3に送られるような環境でその転送完了時にデータを集計しユーザに通知する仕組みを作ってみます
全体像
大枠の構成は以下の通りです。
詳細は後述しますが転送の初期化および実行をするステートマシン、転送状態を確認し必要に応じAthenaを起動するステートマシン、Athenaの完了を通知するステートマシンの3層で構成されています。
実のところ構想時はもっとシンプルになる予定だったのですが思ったのとちょっと違う、このパターンまずいみたいのがあったりしてややリッチになりました。
この構成でも正常パターンのみを考えてもイベントの発生タイミング次第でAthenaの起動が抜ける等まだ足りないとは思いますがひとまず大枠は見れるのでそういったパターンは今回見逃しています。
各ステートマシンについて
転送の初期化・実行
こちらのステートマシンは以下のようなフローで構成されています。
今回は実行の入力のファイルについては実行時に手で入力する形にしています。
後述のフェーズに一部関わるのですがSFTPコネクタによる完了イベントは1コマンド=1イベントでまとまっているわけではなくファイル単位で管理されているようです。
実際にtest1.txt
およびtest2.txt
を同時に指定した場合の完了イベントをSNS経由で受信したところ以下の通りとなっておりました。(test2.txt
に対し同等のものが別途もう1つ)
{ "version": "0", "id": "xxxxx", "detail-type": "SFTP Connector File Retrieve Completed", "source": "aws.transfer", "account": "xxxxx", "time": "2024-02-26T04:09:23Z", "region": "ap-northeast-1", "resources": [ "arn:aws:transfer:ap-northeast-1:xxxxx:connector/c-xxxxx" ], "detail": { "operation": "RETRIEVE", "connector-id": "c-xxxxx", "transfer-id": "xxxxx", "file-transfer-id": "xxxxx", "url": "sftp://xxx.xxx.xxx.xxx", "file-path": "/home/sftpuser/test1.txt", "status-code": "COMPLETED", "local-directory-path": "/xxxxxx/receive", "bytes": 16, "start-timestamp": "2024-02-26T04:09:22.931482Z", "end-timestamp": "2024-02-26T04:09:22.941393Z", "local-file-location": { "domain": "S3", "bucket": "xxxxxx", "key": "receive/test1.txt" } } }
そのため複数のファイルを転送する場合はこれらの通知を集約し状態を管理する何かを用意した上で全て出揃った後にクエリを開始する必要が出てきました。
今回は費用面と利便性からDynamoDBを利用し実行前に全てそちらに未開始ステータスでデータを投入し参照することで管理するようにしております。
なお気時点ではStartTransferFile
1コマンドで転送可能なファイル数は5つかつSFTPコネクタの同時の実行数制限自体も10となっております。
(2023/03/05追記)理解に誤りがありサーバあたりのコネクションは1000まで行けるようです。SSH側の仕組みで1コネクションで複数接続が張れる?機能があるらしくそちらが最大10で実行数自体は1000まで行けるのでしょうか。
今回はその制限を超えることないように実行していますが、実際のワークロードであれば超えるケースの方が多いかと思いますのでそういった場合は一旦SQSに投入し完了イベントで呼び出されるステートマシンで次の処理を始めるような構成が必要となってきます。
Athena起動
こちらのステートマシンは以下のようなフローで構成されています。
完了に応じ先ほどDynamoDBに投入したデータを更新し未完了のものが1つも得られなくなれば(Count=0)Athenaが実行されるようになっております。 今回はシンプルにCountでデータ数を取るのみの処理です。 細かい同時実行制御をしていないので実行タイミングと結果整合性の特性が合わさるともしかするとAthena実行をせずに終了するパターンが出そうな気はします。
後処理としてDynamoDBのデータを消すようなことを今回していないのでどこかでクリーンアップを行わないと無限にデータが溜まり続けます。
通知
こちらのステートマシンは以下のようなフローで構成されています。
StartQueryExecution
もStartTransferFile
同様非同期的に実行されるAPIとなっております。
今回はせっかくSFTPコネクタ側をイベント駆動にしてこちらが定期的に確認というのも違和感がありますので完了をトリガーに起動するステートマシンを別に作成し分離します。
結果についてはAthena側でGetQueryResults
を実行することで別途S3にアクセスせずとも取得は可能でしたがJSONで階層化されており何となく見栄えが好きではなかったのでイベント内に含まれる設置先の情報を利用し別途S3側から取得し通知しています。
ちなみにGetQueryResults
で得られた結果のデータ部は以下のような形となります。
{ ... "Rows": [ { "Data": [ { "VarCharValue": "DataCount" } ] }, { "Data": [ { "VarCharValue": "0" } ] } ], ... }
。
テンプレート
今回はSAMを利用して構築しており、テンプレートは以下に格納しております。
SFTPコネクタ自体とSFTPサーバ部分は含まれていないため別途構築が必要となります。
実行
準備として転送用のTSVファイルをSFTPサーバ上に設置しておきます。
(中央の値は対応するシーケンス番号のハッシュ値です)
$ pwd /home/sftpuser $ cat sec1.csv 1 c4ca4238a0b923820dcc509a6f75849b 2022-11-25 08:58:23.151356 2 c81e728d9d4c2f636f067f89cc14862c 2022-11-25 08:58:23.151356 3 eccbc87e4b5ce2fe28308fd9f2a7baf3 2022-11-25 08:58:23.151356 4 a87ff679a2f3e71d9181a67b7542122c 2022-11-25 08:58:23.151356 5 e4da3b7fbbce2345d7772b0674a318d5 2022-11-25 08:58:23.151356 6 1679091c5a880faf6fb5e6087eb1b2dc 2022-11-25 08:58:23.151356 7 8f14e45fceea167a5a36dedd4bea2543 2022-11-25 08:58:23.151356 8 c9f0f895fb98ab9159f51fd0297e236d 2022-11-25 08:58:23.151356 9 45c48cce2e2d7fbdea1afc51c7c6ad26 2022-11-25 08:58:23.151356 10 d3d9446802a44259755d38e6d163e820 2022-11-26 09:00:24.923556 $ cat sec2.csv 11 6512bd43d9caa6e02c990b0a82652dca 2022-11-25 08:58:23.151356 12 c20ad4d76fe97759aa27a0c99bff6710 2022-11-25 08:58:23.151356 13 c51ce410c124a10e0db5e4b97fc2af39 2022-11-25 08:58:23.151356 14 aab3238922bcc25a6f606eb525ffdc56 2022-11-25 08:58:23.151356 15 9bf31c7ff062936a96d3c8bd1f8f2ff3 2022-11-25 08:58:23.151356 16 c74d97b01eae257e44aa9d5bade97baf 2022-11-25 08:58:23.151356 17 70efdf2ec9b086079795c442636b55fb 2022-11-25 08:58:23.151356 18 6f4922f45568161a8cdf4ad2299f6d23 2022-11-25 08:58:23.151356 19 1f0e3dad99908345f7439f8ffabdffc4 2022-11-25 08:58:23.151356 20 98f13708210194c475687be6106a3b84 2022-11-25 08:58:23.151356
最初のステートマシンをマネジメントコンソールから起動します。
引数として先ほどの2つのファイルのフルパスを指定し起動します。
起動して少し待つとデータの転送が完了し以下のように今回実行したSQL(SELECT COUNT(*) AS DataCount FROM sample_table
)の結果がBODYに格納されたJSONがSNSで通知されます。
フォーマットの関係上見やすさに関してはGetQueryResults
とそこまで大差はないですが改行文字を実際の改行にしてしまえば通常のTSVのみで見やすいはずです。
(見やすさより安全のためにbase64でエンコードした方が良い気もしますが)
{"AcceptRanges":"bytes","ContentLength":17,"ContentType":"application/octet-stream","ETag":"\"8e3409564bab85e98b6829cbde2b4d0e\"","LastModified":"2024-03-01T14:23:42Z","Metadata":{},"ServerSideEncryption":"AES256","Body":"\"DataCount\"\n\"20\"\n"}
経過を見るとsec2.csvが先に完了しその後にsec1.csvが完了しそちらの処理でAthenaを実行しているようです。
終わりに
今回はSFTPコネクタの転送完了イベントを活用した簡単な(?)サンプルを作ってみました。
実際組み上げてみるとSFTPサーバを除けばマネージドでサーバの管理が不要、Lambda関数等でプログラムを書く必要はなくEventBridgeやStepFunctionsの利用で所謂ローコード的に構築が可能、イベント駆動のため余分なチェック処理も走らず従量課金のサービスは最低限のみの請求
と非常にクラウドネイティブな良い感じのサービスで完結することができました。
サイズが不定で大容量のものも混じってると定期的なチェックがのオーバーヘッドの削減、(今回は組み込んでいませんが)イベント駆動を活かしエラー発生時に即時に通知やリカバリ処理の復帰などそう言った面でメリットは感じられてくるかと思います。
逆に一定のファイルサイズで各ファイル自体も小容量であれば定期的なチェックの方が呼び出し回数が結果として減り、制御もシンプルになるかと思いますで一概にこの方式が良いとはいえずケースバイケースとはなります。
今回のアップデートでSFTPコネクタ周辺でイベント駆動の処理が可能とはなりましたが状況により最適な処理方式は変わってきますのでこの場合は何が良いのだろうかという選択肢の一つとしてぜひ色々考えていただければと思います。